package com.flow.framework.core.system.thread.pool.executor;

import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.system.checker.SystemStatusChecker;
import com.flow.framework.core.system.thread.pool.factory.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;

/**
 * 系统状态检查执行器
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/1/23
 */
@Slf4j
public class SystemStatusCheckerExecutor {

    private static final Object LOCK = new Object();

    /**
     * 监听器最大延时不能超过5分钟，避免系统运行过程中突然出现问题而无法感知
     */
    private static final int LISTENER_MAX_INITIAL_DELAY = 300000;

    /**
     * 监听器最小执行间隔不能超过5秒
     */
    private static final int LISTENER_MIN_PERIOD = 5000;

    /**
     * 用于调度任务超时监控的线程池
     */
    private volatile static ScheduledExecutorService scheduledExecutorService;

    private static final Map<SystemStatusChecker, ScheduledFuture<?>> CHECKER_AND_FUTURE_MAP = new ConcurrentHashMap<>();

    private static final Map<Integer, Set<String>> HEALTH_CHECK_CODE_AND_UNHEALTHY_TAGS_MAP = new ConcurrentHashMap<>();

    /**
     * 调度系统状态检查器
     *
     * @param systemStatusChecker 系统状态检查器
     */
    public static void schedule(SystemStatusChecker systemStatusChecker) {
        if (VerifyUtil.isEmpty(systemStatusChecker)) {
            return;
        }
        long initialDelay = systemStatusChecker.getInitialDelay();
        long period = systemStatusChecker.getPeriod();
        if (initialDelay > LISTENER_MAX_INITIAL_DELAY || period < LISTENER_MIN_PERIOD) {
            log.error("system status check execute params over limit, initial delay : {}, period: {}", initialDelay, period);
            throw new CheckedException(SystemErrorCode.SYSTEM_NOT_SUPPORT_ERROR);
        }

        if (VerifyUtil.isEmpty(scheduledExecutorService)) {
            synchronized (LOCK) {
                if (VerifyUtil.isEmpty(scheduledExecutorService)) {
                    scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
                            new DefaultThreadFactory("system_status_listener_"),
                            new ThreadPoolExecutor.AbortPolicy());
                }
            }
        }

        ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                Set<String> unhealthyTags = systemStatusChecker.asyncHealthCheck();
                Set<String> currentUnhealthyTags = HEALTH_CHECK_CODE_AND_UNHEALTHY_TAGS_MAP
                        .computeIfAbsent(systemStatusChecker.getServiceHealthCheckCode(), code -> Collections.synchronizedSet(new HashSet<>()));
                if (!VerifyUtil.isEmpty(unhealthyTags)) {
                    currentUnhealthyTags.addAll(unhealthyTags);
                }
            } catch (Throwable t) {
                log.error("system status listener error.", t);
                HEALTH_CHECK_CODE_AND_UNHEALTHY_TAGS_MAP.put(systemStatusChecker.getServiceHealthCheckCode(),
                        new HashSet<>(Collections.singletonList(systemStatusChecker.getClass().getSimpleName())));
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);
        CHECKER_AND_FUTURE_MAP.put(systemStatusChecker, scheduledFuture);
    }

    /**
     * 停止系统状态检查器
     *
     * @param systemStatusChecker 系统状态检查器
     */
    public static void stopListener(SystemStatusChecker systemStatusChecker) {
        if (VerifyUtil.isEmpty(systemStatusChecker)) {
            return;
        }
        try {
            ScheduledFuture<?> scheduledFuture = CHECKER_AND_FUTURE_MAP.get(systemStatusChecker);
            if (!VerifyUtil.isEmpty(scheduledFuture)) {
                scheduledFuture.cancel(true);
            }
        } catch (Exception e) {
            log.error("cancel system status listener task error", e);
        }
        CHECKER_AND_FUTURE_MAP.remove(systemStatusChecker);
    }

    /**
     * 获取健康状态编码和不健康标签
     *
     * @return 健康状态编码和不健康标签的map
     */
    public static Map<Integer, Set<String>> getHealthCheckCodeAndUnhealthyTagsMap() {
        return new HashMap<>(HEALTH_CHECK_CODE_AND_UNHEALTHY_TAGS_MAP);
    }
}
